FRETBursts regression test

This notebook implements a simple regression testing for FRETBursts.

This notebook defines a series of test-cases that are executed with the current version of FRETBursts. The result of each test is compared with results saved by a previous commit (compare_commit). Saving the tests results (by setting save_reference = True), is possible to use the current commit as a compare_commit in future revisions.

Configuration


In [ ]:
# If True save this run as reference for future comparison
save_reference = True

In [ ]:
# Commit to use for test comparison
compare_commit = 'f5faeae' # previous saved test point (or None to bootstrap)

In [ ]:
%run load_fretbursts.py --nogui

In [ ]:
data_dir = os.path.abspath(u'../../data/') + '/'
data_dir

Comparison functions


In [ ]:
import os
test_save_dir = ''
if save_reference:
    test_save_dir = data_dir + 'test/' + git.get_last_commit() + '/'
    if not os.path.exists(test_save_dir):
        os.mkdir(test_save_dir)
    print 'Saving test results in:', test_save_dir

test_load_dir = ''
if compare_commit is not None:
    test_load_dir = data_dir + 'test/' + compare_commit + '/'
    if not os.path.exists(test_load_dir):
        raise ValueError('Path %s not found, choose a different commit.' % test_load_dir)
    print 'Loading test results from:', test_load_dir

In [ ]:
import cPickle as pickle

In [ ]:
map_ph_sel = {Ph_sel(Dex='Dem'): 'D', Ph_sel(Dex='Aem'): 'A',
              Ph_sel('all'): 'DA', Ph_sel(Aex='Aem'): 'AA'}
map_renames = dict(leakage='BT', rate_th='Th', leakage_corrected='bt_corrected', dir_ex='dir')
new_names = ['bg_fun_name' , 'bg_auto_th', 'bg_ph_sel', 'rate_da', 'bg_da', 'dir_ex', 'dir_ex_corrected']

def compare_data(d1, d2, verbose=True, debug=False, exclude_ph_times=False):
    """Compare two Data() objects for equality (useful for regression test).
    """
    if d2 is None:
        print ' * WARNING: Saved test not found, skipping comparison.'
        return True
        
    equal = True
    for key in d1:
        if verbose: 
            print "Testing %s (%s) ... " % (key, type(d1[key])),
            
        if callable(d1[key]):
            # Skip function comparison
            if verbose: print "skipping (function)"
            continue
            
        if exclude_ph_times and key == 'ph_times_m':
            if verbose: print "[TEST SKIPPED]"
            continue
        
        # Detect variable renames or recent additions
        if key not in d2:
            if key in map_renames and map_renames[key] in d2:
                d2[key] = d2[map_renames[key]]
            elif key.endswith('_err') or key.startswith('fit_E_') or \
                 key.startswith('bg_th_us') or key in new_names:
                print "WARNING: Attribute '%s' in d1 is missing in d2" % key
                continue
            else:    
                print "ERROR\n * Attribute '%s' in d1 is missing in d2" % key
                equal = False
                continue

        if d1[key] is None:
            if not (d2[key] is None):
                equal = False
                print "ERROR\n * Attribute '%s' is None d1 but %s in d2" % \
                        (key, d2[key])
            elif verbose: 
                print 'OK (None)'
            continue
        
        # Detect new Ph_sel type and compare to old str representation
        if type(d1[key]) is Ph_sel and type(d2[key]) is str:
            if map_ph_sel[d1[key]] != d2[key]:
                #equal = False
                print "ERROR\n * Attribute ph_sel does not match: '%s', '%s'" % \
                        (d1[key], d2[key])
                print "    >>>> Error amended due to bug in old versions."
            continue
            
        if type(d1[key]) is Ph_sel and type(d2[key]) is Ph_sel:
            equal = d1[key] == d2[key]
            if not equal:
                print  "ERROR\n * Attribute '%s' do not match: '%s', %s" % \
                        (d1[key], d2[key])
            continue
                    
        # Test if the attributes have the same type
        if not (type(d1[key]) == type(d2[key])):
            equal = False
            print "ERROR\n * Attribute '%s' has type %s in d1 but %s in d2" % (key,
                    type(d1[key]), type(d2[key]))
            continue
        
        if np.isscalar(d1[key]):
            scalar1, scalar2 = d1[key], d2[key]
            if key == 'fname':
                scalar1 = os.path.basename(os.path.abspath(scalar1))
                scalar2 = os.path.basename(os.path.abspath(scalar2))
            if scalar1 != scalar2:
                print("ERROR\n d1.{k} and d2.{k} differ (scalar).".format(k=key))
                equal = False
            elif verbose: 
                print 'OK (scalar)'
            continue
        
        # If the attribute is an empty list
        if type(d1[key]) is list and len(d1[key]) == 0:
            if not (type(d2[key]) is list and len(d2[key]) == 0):
                print "ERROR\n * Attribute '%s' is an empty list in d1 but not in d2"
                equal = False
            elif verbose: 
                print 'OK (empty list)'
            continue
        
        # If the attribute is a dict
        if type(d1[key]) is dict:
            dict_comp = []
            for sub_key in d1[key]:
                if sub_key in d2[key]:
                    d2_key_subkey = d2[key][sub_key]
                else:
                    if sub_key == Ph_sel(Aex='Dem'):
                        # Ignore missing key
                        print " * WARNING: Ignoring missing key %s in d2['%s']" %\
                                (sub_key, key)
                        continue
                    
                    # Try to map new Ph_sel keys to old-style strings
                    if type(sub_key) is Ph_sel:
                        ph_map = {Ph_sel(Dex='Dem'): 'D', Ph_sel(Dex='Aem'): 'A',
                                  Ph_sel('all'): 'DA', Ph_sel(Aex='Aem'): 'AA'}
                        d2_key_subkey = d2[key][ph_map[sub_key]]
                    else:
                        print "ERROR\n * The dict '%s' has the key '%s' in d1 but not in d2."
                        equal = False
                        continue
                if type(d1[key][sub_key]) == np.ndarray:
                    dict_comp.append(np.allclose(d1[key][sub_key], d2_key_subkey))
                else:
                    dict_comp.append(d1[key][sub_key] == d2[key][sub_key])
            equal_dict = np.alltrue(dict_comp)
            if not equal_dict:
                equal = False
                print "ERROR\n * Attribute '%s' (dict) differs between d1 and d2"
            elif verbose: 
                print 'OK (dict)'
            continue
        
        # Detect cases of espected changed values
        if key == 'bg_th_us_user':
            if not d1['ALEX']:
                # Test only the first 3 elemenents: 
                # other elements, if present, are not significant
                d1[key] = d1[key][:3]  
                d2[key] = d2[key][:3]
            equal = np.array_equal(d1[key], d2[key])
            if not equal:
                print  "ERROR\n * Attribute '%s' does not match: '%s' vs '%s'" % \
                        (key, d1[key], d2[key])
            continue
            
        # Now the attribute should only be a list of arrays (one per channel)
        try:
            assert (len(d1[key]) == d1['nch']) and (len(d2[key]) == d2['nch'])
        except AssertionError:
            equal = False
            print "ERROR\n * Attribute '%s' has length %d in d1 and length %d in d2." % \
                    (key, len(d1[key]), len(d2[key]))
            print "  They should both be nch (%d)" % d1['nch']
            continue

        # Test the multi-ch fields (list of arrays)
        test_res = np.zeros(d1['nch'], dtype=bool)
        for ich, (val1, val2) in enumerate(zip(d1[key], d2[key])):
            if type(val1) == type(None):
                if debug:
                    print ('NA1 {} ({}) {}'.format(key, type(val1), val1))
                    print ('NA2 {} ({}) {}'.format(key, type(val2), val2))
                test_res[ich] = (val1 == val2)
            else:
                if debug:
                    print ('A1 {} ({}) {}'.format(key, type(val1), val1))
                    print ('A2 {} ({}) {}'.format(key, type(val2), val2))
                test_res[ich] = np.allclose(val1, val2)
                if not test_res[ich] and np.isnan(val1).any():
                    print ' * WARNING: Testing only non-NaN values in %s[%d]' % (key, ich)
                    test_res[ich] = np.allclose(val1[~np.isnan(val1)], val2[~np.isnan(val1)])
        if not test_res.all():
            print "ERROR\n d1.%s and d2.%s differ (non-scalar)." % \
                    (key, key)
            print "    Test mask: %s " % (test_res,)
            equal = False
        elif verbose: 
            print 'OK (multi)'
        
    return equal

In [ ]:
def save_test(name, d, dir_=test_save_dir, exclude_ph_times=True):
    print 'Saving test to:', test_save_dir
    d_save = dict(d)
    
    # Remove functions
    for key in d:
        if callable(d_save[key]):
            d_save.pop(key)
    
    if exclude_ph_times:
        d_save.pop('ph_times_m')
        
    with open(dir_+TEST+'.pickle', 'wb') as f:
        pickle.dump(d_save, f, protocol=2)

def load_test(name, dir_=test_load_dir):
    print 'Loading test from:', test_load_dir
    file_name = dir_ + TEST + '.pickle'
    if not os.path.isfile(file_name):
        print ' - Saved test not found.'
        return None
    with open(file_name, 'rb') as f:
        d2 = pickle.load(f)
    return d2

Dataset used for the tests


In [ ]:
fn = "7d_New_150p_320mW_steer_3.dat"

Find the full file name:


In [ ]:
fname = data_dir + fn
fname

List of tests

Load and process the data:


In [ ]:
TEST = 'test1'
d = loader.multispot8(fname=fname)
d.add(leakage=0.044, gamma=1.)
d.calc_bg(bg.exp_fit, time_s=20, tail_min_us=200)
d.burst_search_t(L=10, m=10, F=6)
d_test = d

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
TEST = 'test2'
d.calc_bg(bg.exp_fit, time_s=20, tail_min_us=200)
d.burst_search_t(L=10, m=10, P=None, F=6, ph_sel=Ph_sel(Dex='Dem'))
d_test = d

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
d_test['ph_sel'], d_saved['ph_sel']

In [ ]:
TEST = 'test3'
d.calc_bg(bg.exp_fit, time_s=20, tail_min_us=200)
d.burst_search_t(L=10, m=10, F=6, ph_sel=Ph_sel(Dex='Aem'))
d.fuse_bursts(ms=-1)
d_test = d

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
TEST = 'test4'
d.calc_bg(bg.exp_fit, time_s=5, tail_min_us=200)
d.burst_search_t(L=20, m=10, F=6, ph_sel=Ph_sel('all'))
d.fuse_bursts(ms=1)
d_test = d

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
TEST = 'test5'

E1_raw = 0.65
gamma = 0.45
d.burst_search_t(L=10, m=10, F=6, ph_sel=Ph_sel(Dex='Aem'))
ds = Sel(d, select_bursts.nda, th1=20, gamma1=gamma)
ds.update_gamma(1.)
ds.fit_E_ML_poiss(E1=E1_raw, method=2)
d_test = ds

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
TEST = 'test6'

ds.fit_E_generic(E1=E1_raw, fit_fun=bl.gaussian_fit_hist, weights='size', gamma=gamma)
d_test = ds

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, debug=False, exclude_ph_times=True)

In [ ]:
TEST = 'test7'

ds.fit_E_m(E1=E1_raw, weights='size', gamma=gamma)
d_test = ds

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
TEST = 'test8'

ds.fit_E_m(E1=E1_raw, weights=None, gamma=gamma)
d_test = ds

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
# When reference is d1e37 this fails because TEST9 cannot be saved for that reference
TEST = 'test9'

ds.update_gamma(gamma)
ds.fit_E_two_gauss_EM(weights='size', gamma=gamma)
d_test = ds

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
TEST = 'test10'

ds.fit_E_generic(E1=E1_raw, fit_fun=bl.gaussian_fit_cdf)
d_test = ds

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
TEST = 'test11'
d.calc_bg(bg.exp_fit, time_s=20, tail_min_us='auto', F_bg=1.7)
d.burst_search_t(L=10, m=10, F=6, ph_sel=Ph_sel('all'), max_rate=True)
d_test = d

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:
TEST = 'test12'
d.burst_search_t(L=10, m=10, F=6)
ds = Sel(d, select_bursts.size, th1=30)
print ds.num_bu()
ds.calc_max_rate(m=5, ph_sel=Ph_sel(Dex='Aem'))
d_test = ds

if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)

In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)

In [ ]:


In [ ]:
d_test.stats()

In [ ]:
print 'OK'

In [ ]:


In [ ]: